package com.guo.mixframe.framework.receivers.rpc

import com.guo.mixframe.framework.configs.CoreConfig
import com.guo.mixframe.framework.exception.CoreException
import com.guo.mixframe.framework.exceptions.CustomException
import com.guo.mixframe.framework.log.LogPrint
import com.guo.mixframe.framework.protocol.RpcReqHeader
import com.guo.mixframe.framework.protocol.RpcRespHeader
import com.guo.mixframe.framework.utils.CrossServerHelper
import com.guo.mixframe.framework.utils.ProtobufUtils
import io.netty.buffer.ByteBuf
import io.netty.buffer.PooledByteBufAllocator
import io.protostuff.Tag
import io.vertx.core.AsyncResult
import io.vertx.core.Vertx
import io.vertx.core.buffer.Buffer
import io.vertx.ext.web.client.HttpRequest
import io.vertx.ext.web.client.HttpResponse
import io.vertx.ext.web.client.WebClient
import io.vertx.kotlin.coroutines.awaitEvent
import java.util.concurrent.ConcurrentHashMap

/**
 * rpc客户端
 */
class RpcClient {

    class RpcGeneralReq {
        @Tag(1)
        var msg: String? = null
        @Tag(2)
        var zone: String? = null

        constructor()
        constructor(msg: String, zone: String) {
            this.msg = msg
            this.zone = zone
        }

    }

    class RpcGeneralResp {
        @Tag(1)
        var nickName: String? = null

        @Tag(2)
        var result: String? = null

        @Tag(3)
        var errorCode: Int = -1
    }

    companion object {
        val instance = RpcClient()

        /**
         * 发送给指定区服
         */
        suspend fun <T> rpcToZone(zone: String, url: String, req: Any?, clazz: Class<T>?, type: String? = null): T? {
            val ip = CrossServerHelper.getIPOfZone(zone)
            return rpc(ip, url, req, clazz, type)
        }

        /**
         * 发送给登录服
         */
        suspend fun <T> rpcToHost(url: String, req: Any?, clazz: Class<T>?, type: String? = null): T? {
            return rpc(CoreConfig.getInstance().checkLoginIp(), url, req, clazz, type)
        }

        /**
         * 发送消息给跨服（中心服）
         */
        suspend fun <T> rpcToCross(url: String, req: Any?, clazz: Class<T>?, type: String? = null): T? {
            return rpc(CoreConfig.getInstance().checkCrossIp(), url, req, clazz, type)
        }

        /**
         * 发送消息给资源服
         */
        suspend fun <T> rpcToResource(url: String, req: Any?, clazz: Class<T>?, type: String? = null): T? {
            return rpc(CoreConfig.getInstance().checkResourceIP(), url, req, clazz, type)
        }

        /**
         * 发送给指定ip
         */
        suspend fun <T> rpc(localIP: String, url: String, req: Any?, clazz: Class<T>?, type: String? = null): T? {
            val bytes = instance.sendRpc(localIP, url, req, type)
            if (clazz == null || clazz == Void::class.java || bytes == null) return null
            return ProtobufUtils.byteToBean(bytes, clazz)
        }
    }

    /**
     * 缓存下来的rpc请求，复用
     */
    private val httpCache = ConcurrentHashMap<String, HttpRequest<Buffer>>()

    /**
     * rpc端口
     */
    private val port = CoreConfig.getInstance().checkRpcPort()

    /**
     * 用来创建http请求
     */
    private lateinit var webClient: WebClient

    private var vertx: Vertx? = null

    fun init(vertx: Vertx) {
        this.vertx = vertx
        this.webClient = WebClient.create(vertx)
    }

    /**
     * 发送一个rpc请求，返回响应体的字节数组
     */
    private suspend fun sendRpc(ip: String, url: String, req: Any?, type: String?): ByteArray? {
        var client = httpCache[ip]
        if (client == null) {
            val newClient = createHttp(ip)
            client = httpCache.putIfAbsent(ip, newClient)
            if (client == null) {
                client = newClient
            }
        }
        val body = createBody(url, type, req)
        val ar = awaitEvent<AsyncResult<HttpResponse<Buffer>>> {
            client.sendBuffer(Buffer.buffer(body), it)
        }
        body.release()
        if (ar.succeeded()) {
            val responseByteBuf = ar.result().body().byteBuf
            return parseResponse(responseByteBuf, url)
        }
        throw CoreException().throwException(CoreException.RpcFailed)
    }

    /**
     * 创建一个http请求
     */
    private fun createHttp(ip: String): HttpRequest<Buffer> {
        return webClient.postAbs("http://$ip:$port")
    }

    /**
     * 创建rpc的请求内容，封装成vertx的Buffer
     */
    private fun createBody(url: String, type: String?, req: Any?): ByteBuf {
        val header = RpcReqHeader()
        header.url = url
        header.rpcType = type

        val headerBytes = ProtobufUtils.bean2Byte(header)

        var length = headerBytes.size + 4

        var bodyBytes: ByteArray? = null
        if (req != null) {
            bodyBytes = ProtobufUtils.bean2Byte(req)
            length += bodyBytes.size
        }

        val buf = PooledByteBufAllocator.DEFAULT.buffer(length)

        buf.writeIntLE(headerBytes.size)
        buf.writeBytes(headerBytes)

        if (bodyBytes != null) {
            buf.writeBytes(bodyBytes)
        }
        return buf
    }

    /**
     * 解析rpc响应，返回响应的字节数组，如果请求失败就抛出异常
     */
    private fun parseResponse(byteBuf: ByteBuf, url: String): ByteArray? {
        val headerLen = byteBuf.readIntLE()
        val headerBytes = ByteArray(headerLen)
        byteBuf.readBytes(headerBytes)

        val headerClass = RpcRespHeader::class.java
        val header = ProtobufUtils.byteToBean(headerBytes, headerClass)
        if (!header.isSuccess) {
            LogPrint.logger.debug("rpc请求报错: $url")
            throw CustomException(header.errorCode.toInt())
        }

        val bodyLen = byteBuf.readableBytes()
        if (bodyLen == 0) return null
        val bodyBytes = ByteArray(bodyLen)
        byteBuf.readBytes(bodyBytes)
        return bodyBytes
    }
}
